package com.example.flink_demo.job;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.protocol.types.Field;

import java.sql.Timestamp;
import java.util.Properties;

/**
 * @author benjamin_5
 * @Description 从kafka读取数据，输出到mysql：用户调用次数统计
 * @date 2024/9/25
 */
public class KafkaJob {

    private static final Log logger = LogFactory.getLog(ReadFromDbJob.class);

    public static void main(String[] args) {
        try {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 指定并行度,默认电脑线程数
            env.setParallelism(3);

            String jdbcUrl = "jdbc:mysql://localhost:3306/test_db";
            String user = "root";
            String password = "123456";
            String driverName = "com.mysql.cj.jdbc.Driver";

            // 高版本使用
//            KafkaSource<String> source = KafkaSource.<String>builder()
//                    // 必填：指定broker连接信息 (为保证高可用,建议多指定几个节点)
//                    .setBootstrapServers("localhost:9092")
//                    // 必填：指定要消费的topic
//                    .setTopics("flink_test")
//                    // 必填：指定消费者的groupid(不存在时会自动创建)
//                    .setGroupId("flink-group")
//                    // 必填：指定反序列化器(用来解析kafka消息数据)
//                    .setValueOnlyDeserializer(new SimpleStringSchema())
//                    // 可选：指定启动任务时的消费位点（不指定时，将默认使用 OffsetsInitializer.earliest()）
//                    .setStartingOffsets(OffsetsInitializer.earliest())
//                    .build();
//            DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");

            // 低版本使用
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "flink-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "latest");
//
            DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>(
                    "flink_test",
                    new SimpleStringSchema(),
                    props));

            SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream.flatMap(
                            (String value, Collector<Tuple2<String, Integer>> out) -> {
                                String[] words = value.split(" ");
                                for (String word : words) {
                                    out.collect(Tuple2.of(word, 1));
                                }
                            }
                    )
                    // // 显式地提供类型信息:对于flatMap传入Lambda表达式，系统只能推断出返回的是Tuple2类型，而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型，才能正确解析出完整数据
                    .returns(new TypeHint<Tuple2<String, Integer>>() {
                    })
                    .keyBy(value -> value.f0)
                    .sum(1);

            result.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                    String world = value.getField(0);
                    Integer count = value.getField(1);
                    // 打印
                    System.out.println("单词："+world + "，次数："+count);
                }
            });

            result.addSink(JdbcSink.sink(
                    "insert into world_count (name,number,create_time) values (?,?,?)",
                    (ps, t) -> {
                        String name = t.getField(0);
                        Integer count = t.getField(1);
                        System.out.println("结果："+name+","+count);

                        ps.setString(1, name);
                        ps.setInt(2, count);
                        java.util.Date utilDate = new java.util.Date();
                        ps.setTimestamp(3, new Timestamp(utilDate.getTime()));
                    },
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl(jdbcUrl)
                            .withUsername(user)
                            .withPassword(password)
                            .withDriverName(driverName)
                            .build()
            ));

            // 执行
            env.execute("kafka stream job");

        } catch (Exception e) {
            e.printStackTrace();
            logger.error("流任务执行失败：", e);
        }
    }
}
